package com.deep.cdc;

import com.deep.bean.ConnectConfig;
import com.deep.bean.TransformConfig;
import com.deep.common.ArgsConfig;
import com.deep.common.ArgsUtil;
import com.deep.common.ProfileConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SyncQsRec {
    public static void main(String[] args) {
        ArgsConfig argsConfig = ArgsUtil.getMap(args);
        Integer recid = argsConfig.getRecid();

        TransformConfig transformConfig = ProfileConfig.getTransformConfig(argsConfig.getProfile());
        ConnectConfig sourceConfig = transformConfig.getSource();
        ConnectConfig targetConfig = transformConfig.getTarget();


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //创建用于输出的表
        String createInputDDL = "create table source_t_qs_rec " +
                "(" +
                "RECID  int , " +
                "LASTUPDATETIME  timestamp(0) , " +
                "ACTID  int , " +
                "ELAPSEDTIME  string , " +
                "TASKNUM  string , " +
                "DISTRICTCODE  string , " +
                "STREETCODE  string , " +
                "COMMUNITYCODE  string , " +
                "CELLCODE  string , " +
                "RECSRCID  int , " +
                "RECTYPEID  int , " +
                "SUBTYPEID  int , " +
                "MAINTYPEID  int , " +
                "RECSTATEID  int , " +
                "DUTYGRIDID  int , " +
                "CREATETIME  timestamp(0) , " +
                "RECDESC  string , " +
                "ADDRESS  string , " +
                "COORDX  double , " +
                "COORDY  double , " +
                "RECSRCNAME  string , " +
                "RECTYPENAME  string , " +
                "MAINTYPENAME  string , " +
                "SUBTYPENAME  string , " +
                "RECSTATENAME  string , " +
                "DISTRICTNAME  string , " +
                "STREETNAME  string , " +
                "COMMUNITYNAME  string , " +
                "CELLNAME  string , " +
                "DUTYGRIDNAME  string , " +
                "RECGRADENAME  string , " +
                "RECLEVELNAME  string , " +
                "ACTPROPERTYID  int , " +
                "ACTPROPERTYNAME  string , " +
                "REPORTERNAME  string , " +
                "CALLTEL  string , " +
                "CALLTIME  timestamp(0) , " +
                "PATROLID  int , " +
                "VERIFYPATROLID  int , " +
                "CHECKPATROLID  int , " +
                "PATROLNAME  string , " +
                "VERIFYPATROLNAME  string , " +
                "CHECKPATROLNAME  string , " +
                "REPORTCOUNT  int , " +
                "REPORTCOUNTBYPATROL  int , " +
                "REPORTCOUNTBYREPORTER  int , " +
                "REPORTCOUNTVALID  int , " +
                "REPORTCOUNTVALIDBYPATROL  int , " +
                "REPORTCOUNTVALIDBYREPORTER  int , " +
                "VERIFYMSGCOUNT  int , " +
                "VERIFYMSGCOUNTSENT  int , " +
                "VERIFYMSGCOUNTTOSEND  int , " +
                "VERIFIEDMSGCOUNTSENT  int , " +
                "VERIFIEDMSGCOUNT  int , " +
                "RECEIVEDCOUNT  int , " +
                "RECEIVINGCOUNT  int , " +
                "UNRECEIVEDCOUNT  int , " +
                "RECEIVEDONTIME  int , " +
                "REGISTERCOUNT  int , " +
                "REGISTEREDCOUNT  int , " +
                "REGISTERCOUNTVAILD  int , " +
                "REGISTERINGCOUNT  int , " +
                "registeredCountOverTime  int , " +
                "UNREGISTERCOUNT  int , " +
                "REGISTEREDCOUNTONTIME  int , " +
                "DISPATCHCOUNT  int , " +
                "DISPATCHEDCOUNT  int , " +
                "DISPATCHINGCOUNT  int , " +
                "DISPATCHEDCOUNTONTIME  int , " +
                "DISPATCHEDCOUNTCORRECT  int , " +
                "DISPATCHEDCOUNTERROR  int , " +
                "PROCESSCOUNT  int , " +
                "PROCESSEDCOUNT  int , " +
                "PROCESSEDCOUNTONTIME  int , " +
                "PROCESSEDCOUNTOVERTIME  int , " +
                "PROCESSINGCOUNTOVERTIME  int , " +
                "PROCESSINGCOUNT  int , " +
                "CHECKMSGCOUNT  int , " +
                "CHECKMSGCOUNTSENT  int , " +
                "CHECKMSGCOUNTTOSEND  int , " +
                "CHECKEDCOUNT  int , " +
                "CHECKEDCOUNTONTIME  int , " +
                "CHECKCOUNT  int , " +
                "CHECKRESULT  int , " +
                "CHECKTIMES  int , " +
                "CHECKTIMECOST  double , " +
                "COMPLETECOUNT  int , " +
                "COMPLETEDCOUNT  int , " +
                "COMPLETEDCOUNTONTIME  int , " +
                "COMPLETEDCOUNTOVERTIME  int , " +
                "COMPLETINGCOUNTONTIME  int , " +
                "COMPLETINGCOUNTOVERTIME  int , " +
                "REPROCESSCOUNT  int , " +
                "REPROCESSTIMES  int , " +
                "PROCESSSUSPENDCOUNT  int , " +
                "CANCELCOUNT  int , " +
                "PROCESSDELAYCOUNT  int , " +
                "RECEIVESTART  timestamp(0) , " +
                "RECEIVEEND  timestamp(0) , " +
                "RECEIVELIMIT  double , " +
                "RECEIVEDEADLINE  timestamp(0) , " +
                "REGISTERSTART  timestamp(0) , " +
                "REGISTEREND  timestamp(0) , " +
                "REGISTERLIMIT  double , " +
                "REGISTERDEADLINE  timestamp(0) , " +
                "DISPATCHENDFIRST  timestamp(0) , " +
                "DISPATCHENDLAST  timestamp(0) , " +
                "DISPATCHLIMIT  double , " +
                "DISPATCHDEADLINE  timestamp(0) , " +
                "RESPONSIBILITYUNITID  int , " +
                "RESPONSIBILITYUNITNAME  string , " +
                "RESPONSIBILITYUNITABBR  string , " +
                "RESPONSIBILITYUNITREGION  string , " +
                "PROCESSLIMIT  double , " +
                "PROCESSTIMEUNIT  int , " +
                "PROCESSSTART  timestamp(0) , " +
                "PROCESSEND  timestamp(0) , " +
                "PROCESSDEADLINE  timestamp(0) , " +
                "PROCESSTIMECOST  double , " +
                "PROCESSDELAYTIMECOST  double , " +
                "SUPERVISESTART  timestamp(0) , " +
                "SUPERVISELIMIT  double , " +
                "CONFIRMSTART  timestamp(0) , " +
                "CONFIRMEND  timestamp(0) , " +
                "CONFIRMLIMIT  double , " +
                "CHECKEND  timestamp(0) , " +
                "CHECKLIMIT  double , " +
                "COMPLETESTART  timestamp(0) , " +
                "REPROCESSSTART  timestamp(0) , " +
                "COMPLETEEND  timestamp(0) , " +
                "COMPLETELIMIT  double , " +
                "COMPLETEDEADLINE  timestamp(0) , " +
                "CANCELTIME  timestamp(0) , " +
                "ACTPARNAME  string , " +
                "ACTPARTUNITNAME  string , " +
                "CHECKMSGTIMES  int , " +
                "CONFRIMMSGTIMES  int , " +
                "PREREGISTERPARTID  int , " +
                "PREREGISTERPARTNAME  string , " +
                "PREREGISTERPARTROLEID  int , " +
                "CHECKPARTID  int , " +
                "CHECKPARTNAME  string , " +
                "CHECKPARTROLEID  int , " +
                "VERIFYPARTID  int , " +
                "VERIFYPARTNAME  string , " +
                "VERIFYPARTROLEID  int , " +
                "RECEIVEPARTID  int , " +
                "RECEIVEPARTNAME  string , " +
                "RECEIVEPARTROLEID  int , " +
                "REGISTERPARTID  int , " +
                "REGISTERPARTNAME  string , " +
                "REGISTERPARTROLEID  int , " +
                "CANCELPARTID  int , " +
                "CANCELPARTNAME  string , " +
                "CANCELPARTROLEID  int , " +
                "RETURNPARTID  int , " +
                "RETURNPARTNAME  string , " +
                "RETURNPARTROLEID  int , " +
                "RETURNOPINION  string , " +
                "COMPLETEPARTID  int , " +
                "COMPLETEPARTNAME  string , " +
                "COMPLETEPARTROLEID  int , " +
                "DISPATCHPARTID  int , " +
                "DISPATCHPARTNAME  string , " +
                "DISPATCHPARTROLEID  int , " +
                "DISTDISPATCHPARTID  int , " +
                "DISTDISPATCHPARTNAME  string , " +
                "DISTDISPATCHPARTROLEID  int , " +
                "SUPERVISEPARTID  int , " +
                "SUPERVISEPARTNAME  string , " +
                "SUPERVISEPARTROLEID  int , " +
                "DELAYPARTID  int , " +
                "DELAYPARTNAME  string , " +
                "DELAYPARTROLEID  int , " +
                "SUSPENDPARTID  int , " +
                "SUSPENDPARTNAME  string , " +
                "SUSPENDPARTROLEID  int , " +
                "CANCELOPINION  string , " +
                "SUPERVISEOPINION  string , " +
                "PROCESSOPINION  string , " +
                "DISPATCHOPINION  string , " +
                "REGISTEROPINION  string , " +
                "UNRECEIVEOPINION  string , " +
                "RECEIVEOPINION  string , " +
                "COMPLETEOPINION  string , " +
                "CONFIRMOPINION  string , " +
                "VERIFIEDMSGCOUNTSENTONTIME  int , " +
                "DISTDISPATCHCOUNT  int , " +
                "DISTDISPATCHEDCOUNT  int , " +
                "DISTDISPATCHINGCOUNT  int , " +
                "DISTDISPATCHEDCOUNTONTIME  int , " +
                "DISTDISPATCHEDCOUNTCORRECT  int , " +
                "DISTDISPATCHEDCOUNTERROR  int , " +
                "DISTRESPPARENTUNITID  int , " +
                "DISTRESPPARENTUNITNAME  string , " +
                "RESPONSIBILITYLEVEL  int , " +
                "DISPATCHPARTREGIONID  int , " +
                "DISTDISPATCHPARTREGIONID  int , " +
                "SELFPROCESSCOUNT  int , " +
                "SUPERVISEPROCESSCOUNT  int , " +
                "ACTDEFID  int , " +
                "ACTDEFNAME  string , " +
                "VERIFYSTART  timestamp(0) , " +
                "VERIFYEND  timestamp(0) , " +
                "VERIFYLIMIT  double , " +
                "AUTOCOMPLETEDCOUNT  int , " +
                "REPROCESSUNPASSCOUNT  int , " +
                "RECPLATFORM  int , " +
                "SIMILARREGISTEREDCOUNT  int , " +
                "CITYACCDELAYCOUNT  int , " +
                "DISTACCDELAYCOUNT  int , " +
                "DISTACCROLLBACKCOUNT  int , " +
                "CITYACCDELAYCOUNTONTIME  int , " +
                "DISTACCDELAYCOUNTONTIME  int , " +
                "DISTACCROLLBACKCOUNTONTIME  int , " +
                "CHECKMSGCOUNTSENTERROR  int , " +
                "evaluateOrderScore  int , " +
                "evaluateOrderScoreCount  int , " +
                "evaluateScore  int , " +
                "evaluateScoreCount  int , " +
                "leaderSupersiveStateID  int , " +
                "auditStatus  int , " +
                "ESTABLISHCONDID  int , " +
                "ESTABLISHCONDNAME  string , " +
                "hasOvertimeCount  int , " +
                "auditBackTime  timestamp(0) , " +
                "auditPassTime  timestamp(0) , " +
                "evaluateOrderTime  timestamp(0) , " +
                "evaluateOrderUserId  int , " +
                "evaluateOrderUserName  string , " +
                "processRolePartID  int , " +
                "processRolePartName  string , " +
                "mobile  string , " +
                "workOrderNo  string , " +
                "aacceptorNo  string , " +
                "auditTime  timestamp(0) , " +
                "auditCount  int , " +
                "questionType  string , " +
                "auditUserId  int , " +
                "auditUserName  string , " +
                "CHECKMSGSTATEID  int , " +
                "appealtype  int , " +
                "iscomponent  int ," +
                "    PRIMARY KEY (`RECID`) NOT ENFORCED" +
                ")WITH (" +
                "    'connector' = 'mysql-cdc'," +
                "    'hostname' = '" + sourceConfig.getHostname() + "',  " +
                "    'port' = '" + sourceConfig.getPort() + "',   " +
                "    'username' = '" + sourceConfig.getUsername() + "',  " +
                "    'password' = '" + sourceConfig.getPassword() + "',   " +
                "    'database-name' = '" + sourceConfig.getDatabaseName() + "',  " +
                "    'table-name' = 't_qs_rec'," +
                "   'debezium.skipped.operations'='d'" +
                ") ";
        tableEnv.executeSql(createInputDDL);


        //创建用于输出的表
        String createOutputDDL = "create table target_t_qs_rec " +
                "(" +
                "RECID  int , " +
                "LASTUPDATETIME  timestamp(0) , " +
                "ACTID  int , " +
                "ELAPSEDTIME  string , " +
                "TASKNUM  string , " +
                "DISTRICTCODE  string , " +
                "STREETCODE  string , " +
                "COMMUNITYCODE  string , " +
                "CELLCODE  string , " +
                "RECSRCID  int , " +
                "RECTYPEID  int , " +
                "SUBTYPEID  int , " +
                "MAINTYPEID  int , " +
                "RECSTATEID  int , " +
                "DUTYGRIDID  int , " +
                "CREATETIME  timestamp(0) , " +
                "RECDESC  string , " +
                "ADDRESS  string , " +
                "COORDX  double , " +
                "COORDY  double , " +
                "RECSRCNAME  string , " +
                "RECTYPENAME  string , " +
                "MAINTYPENAME  string , " +
                "SUBTYPENAME  string , " +
                "RECSTATENAME  string , " +
                "DISTRICTNAME  string , " +
                "STREETNAME  string , " +
                "COMMUNITYNAME  string , " +
                "CELLNAME  string , " +
                "DUTYGRIDNAME  string , " +
                "RECGRADENAME  string , " +
                "RECLEVELNAME  string , " +
                "ACTPROPERTYID  int , " +
                "ACTPROPERTYNAME  string , " +
                "REPORTERNAME  string , " +
                "CALLTEL  string , " +
                "CALLTIME  timestamp(0) , " +
                "PATROLID  int , " +
                "VERIFYPATROLID  int , " +
                "CHECKPATROLID  int , " +
                "PATROLNAME  string , " +
                "VERIFYPATROLNAME  string , " +
                "CHECKPATROLNAME  string , " +
                "REPORTCOUNT  int , " +
                "REPORTCOUNTBYPATROL  int , " +
                "REPORTCOUNTBYREPORTER  int , " +
                "REPORTCOUNTVALID  int , " +
                "REPORTCOUNTVALIDBYPATROL  int , " +
                "REPORTCOUNTVALIDBYREPORTER  int , " +
                "VERIFYMSGCOUNT  int , " +
                "VERIFYMSGCOUNTSENT  int , " +
                "VERIFYMSGCOUNTTOSEND  int , " +
                "VERIFIEDMSGCOUNTSENT  int , " +
                "VERIFIEDMSGCOUNT  int , " +
                "RECEIVEDCOUNT  int , " +
                "RECEIVINGCOUNT  int , " +
                "UNRECEIVEDCOUNT  int , " +
                "RECEIVEDONTIME  int , " +
                "REGISTERCOUNT  int , " +
                "REGISTEREDCOUNT  int , " +
                "REGISTERCOUNTVAILD  int , " +
                "REGISTERINGCOUNT  int , " +
                "registeredCountOverTime  int , " +
                "UNREGISTERCOUNT  int , " +
                "REGISTEREDCOUNTONTIME  int , " +
                "DISPATCHCOUNT  int , " +
                "DISPATCHEDCOUNT  int , " +
                "DISPATCHINGCOUNT  int , " +
                "DISPATCHEDCOUNTONTIME  int , " +
                "DISPATCHEDCOUNTCORRECT  int , " +
                "DISPATCHEDCOUNTERROR  int , " +
                "PROCESSCOUNT  int , " +
                "PROCESSEDCOUNT  int , " +
                "PROCESSEDCOUNTONTIME  int , " +
                "PROCESSEDCOUNTOVERTIME  int , " +
                "PROCESSINGCOUNTOVERTIME  int , " +
                "PROCESSINGCOUNT  int , " +
                "CHECKMSGCOUNT  int , " +
                "CHECKMSGCOUNTSENT  int , " +
                "CHECKMSGCOUNTTOSEND  int , " +
                "CHECKEDCOUNT  int , " +
                "CHECKEDCOUNTONTIME  int , " +
                "CHECKCOUNT  int , " +
                "CHECKRESULT  int , " +
                "CHECKTIMES  int , " +
                "CHECKTIMECOST  double , " +
                "COMPLETECOUNT  int , " +
                "COMPLETEDCOUNT  int , " +
                "COMPLETEDCOUNTONTIME  int , " +
                "COMPLETEDCOUNTOVERTIME  int , " +
                "COMPLETINGCOUNTONTIME  int , " +
                "COMPLETINGCOUNTOVERTIME  int , " +
                "REPROCESSCOUNT  int , " +
                "REPROCESSTIMES  int , " +
                "PROCESSSUSPENDCOUNT  int , " +
                "CANCELCOUNT  int , " +
                "PROCESSDELAYCOUNT  int , " +
                "RECEIVESTART  timestamp(0) , " +
                "RECEIVEEND  timestamp(0) , " +
                "RECEIVELIMIT  double , " +
                "RECEIVEDEADLINE  timestamp(0) , " +
                "REGISTERSTART  timestamp(0) , " +
                "REGISTEREND  timestamp(0) , " +
                "REGISTERLIMIT  double , " +
                "REGISTERDEADLINE  timestamp(0) , " +
                "DISPATCHENDFIRST  timestamp(0) , " +
                "DISPATCHENDLAST  timestamp(0) , " +
                "DISPATCHLIMIT  double , " +
                "DISPATCHDEADLINE  timestamp(0) , " +
                "RESPONSIBILITYUNITID  int , " +
                "RESPONSIBILITYUNITNAME  string , " +
                "RESPONSIBILITYUNITABBR  string , " +
                "RESPONSIBILITYUNITREGION  string , " +
                "PROCESSLIMIT  double , " +
                "PROCESSTIMEUNIT  int , " +
                "PROCESSSTART  timestamp(0) , " +
                "PROCESSEND  timestamp(0) , " +
                "PROCESSDEADLINE  timestamp(0) , " +
                "PROCESSTIMECOST  double , " +
                "PROCESSDELAYTIMECOST  double , " +
                "SUPERVISESTART  timestamp(0) , " +
                "SUPERVISELIMIT  double , " +
                "CONFIRMSTART  timestamp(0) , " +
                "CONFIRMEND  timestamp(0) , " +
                "CONFIRMLIMIT  double , " +
                "CHECKEND  timestamp(0) , " +
                "CHECKLIMIT  double , " +
                "COMPLETESTART  timestamp(0) , " +
                "REPROCESSSTART  timestamp(0) , " +
                "COMPLETEEND  timestamp(0) , " +
                "COMPLETELIMIT  double , " +
                "COMPLETEDEADLINE  timestamp(0) , " +
                "CANCELTIME  timestamp(0) , " +
                "ACTPARNAME  string , " +
                "ACTPARTUNITNAME  string , " +
                "CHECKMSGTIMES  int , " +
                "CONFRIMMSGTIMES  int , " +
                "PREREGISTERPARTID  int , " +
                "PREREGISTERPARTNAME  string , " +
                "PREREGISTERPARTROLEID  int , " +
                "CHECKPARTID  int , " +
                "CHECKPARTNAME  string , " +
                "CHECKPARTROLEID  int , " +
                "VERIFYPARTID  int , " +
                "VERIFYPARTNAME  string , " +
                "VERIFYPARTROLEID  int , " +
                "RECEIVEPARTID  int , " +
                "RECEIVEPARTNAME  string , " +
                "RECEIVEPARTROLEID  int , " +
                "REGISTERPARTID  int , " +
                "REGISTERPARTNAME  string , " +
                "REGISTERPARTROLEID  int , " +
                "CANCELPARTID  int , " +
                "CANCELPARTNAME  string , " +
                "CANCELPARTROLEID  int , " +
                "RETURNPARTID  int , " +
                "RETURNPARTNAME  string , " +
                "RETURNPARTROLEID  int , " +
                "RETURNOPINION  string , " +
                "COMPLETEPARTID  int , " +
                "COMPLETEPARTNAME  string , " +
                "COMPLETEPARTROLEID  int , " +
                "DISPATCHPARTID  int , " +
                "DISPATCHPARTNAME  string , " +
                "DISPATCHPARTROLEID  int , " +
                "DISTDISPATCHPARTID  int , " +
                "DISTDISPATCHPARTNAME  string , " +
                "DISTDISPATCHPARTROLEID  int , " +
                "SUPERVISEPARTID  int , " +
                "SUPERVISEPARTNAME  string , " +
                "SUPERVISEPARTROLEID  int , " +
                "DELAYPARTID  int , " +
                "DELAYPARTNAME  string , " +
                "DELAYPARTROLEID  int , " +
                "SUSPENDPARTID  int , " +
                "SUSPENDPARTNAME  string , " +
                "SUSPENDPARTROLEID  int , " +
                "CANCELOPINION  string , " +
                "SUPERVISEOPINION  string , " +
                "PROCESSOPINION  string , " +
                "DISPATCHOPINION  string , " +
                "REGISTEROPINION  string , " +
                "UNRECEIVEOPINION  string , " +
                "RECEIVEOPINION  string , " +
                "COMPLETEOPINION  string , " +
                "CONFIRMOPINION  string , " +
                "VERIFIEDMSGCOUNTSENTONTIME  int , " +
                "DISTDISPATCHCOUNT  int , " +
                "DISTDISPATCHEDCOUNT  int , " +
                "DISTDISPATCHINGCOUNT  int , " +
                "DISTDISPATCHEDCOUNTONTIME  int , " +
                "DISTDISPATCHEDCOUNTCORRECT  int , " +
                "DISTDISPATCHEDCOUNTERROR  int , " +
                "DISTRESPPARENTUNITID  int , " +
                "DISTRESPPARENTUNITNAME  string , " +
                "RESPONSIBILITYLEVEL  int , " +
                "DISPATCHPARTREGIONID  int , " +
                "DISTDISPATCHPARTREGIONID  int , " +
                "SELFPROCESSCOUNT  int , " +
                "SUPERVISEPROCESSCOUNT  int , " +
                "ACTDEFID  int , " +
                "ACTDEFNAME  string , " +
                "VERIFYSTART  timestamp(0) , " +
                "VERIFYEND  timestamp(0) , " +
                "VERIFYLIMIT  double , " +
                "AUTOCOMPLETEDCOUNT  int , " +
                "REPROCESSUNPASSCOUNT  int , " +
                "RECPLATFORM  int , " +
                "SIMILARREGISTEREDCOUNT  int , " +
                "CITYACCDELAYCOUNT  int , " +
                "DISTACCDELAYCOUNT  int , " +
                "DISTACCROLLBACKCOUNT  int , " +
                "CITYACCDELAYCOUNTONTIME  int , " +
                "DISTACCDELAYCOUNTONTIME  int , " +
                "DISTACCROLLBACKCOUNTONTIME  int , " +
                "CHECKMSGCOUNTSENTERROR  int , " +
                "evaluateOrderScore  int , " +
                "evaluateOrderScoreCount  int , " +
                "evaluateScore  int , " +
                "evaluateScoreCount  int , " +
                "leaderSupersiveStateID  int , " +
                "auditStatus  int , " +
                "ESTABLISHCONDID  int , " +
                "ESTABLISHCONDNAME  string , " +
                "hasOvertimeCount  int , " +
                "auditBackTime  timestamp(0) , " +
                "auditPassTime  timestamp(0) , " +
                "evaluateOrderTime  timestamp(0) , " +
                "evaluateOrderUserId  int , " +
                "evaluateOrderUserName  string , " +
                "processRolePartID  int , " +
                "processRolePartName  string , " +
                "mobile  string , " +
                "workOrderNo  string , " +
                "aacceptorNo  string , " +
                "auditTime  timestamp(0) , " +
                "auditCount  int , " +
                "questionType  string , " +
                "auditUserId  int , " +
                "auditUserName  string , " +
                "CHECKMSGSTATEID  int , " +
                "appealtype  int , " +
                "iscomponent  int ," +
                "    PRIMARY KEY (`RECID`) NOT ENFORCED" +
                ")WITH (" +
                " 'connector' = 'jdbc'," +
                " 'driver' = 'com.mysql.cj.jdbc.Driver'," +
                " 'url' = 'jdbc:mysql://"+ targetConfig.getHostname() + ":" + targetConfig.getPort() + "/" + targetConfig.getDatabaseName() + "?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC',  " +
                " 'username' = '" + targetConfig.getUsername() + "',  " +
                "  'password' = '" + targetConfig.getPassword() + "',   " +
                "  'table-name' = 't_qs_rec'," +
                "  'connection.max-retry-timeout' = '60s' " +
                ")";
        tableEnv.executeSql(createOutputDDL);


        TableResult tableResult = tableEnv.executeSql("insert into target_t_qs_rec select * from source_t_qs_rec where RECID >  " + recid);

        tableResult.print();

    }
}
